Skip to main content 집밥서선생

Golang으로 WebRTC Mesh Server 구현하기

Published: 2023-02-18

Golang으로 하는 첫 프로젝트로, WebRTC many-to-many 시그널링 서버를 만들게 되었다.



Basic 1:1 Signaling

우선 가장 기본적인 예제인, 1:1 연결 시그널링 서버의 예제부터 구현하였다. 1:1 시그널링 서버는 언어를 막론하고 예제가 많이 존재한다. 본인 또한 참고한 예제가 있다. https://github.com/shanet/WebRTC-Example의 예제를 확인하면 간단한 웹 클라이언트와, Node.js 및 WebSocket 기반의 간단한 시그널링 서버를 찾아볼 수 있다.


서버쪽 소스코드는 너무나도 간단하다. ICE나 SDP 등 WebRTC 연결 정보를 전달해주는 역할을 하는 코드는 이 정도밖에 안된다.

const wss = new WebSocketServer({ server: httpsServer });

wss.on('connection', function (ws) {
	ws.on('message', function (message) {
		// Broadcast any received message to all clients
		console.log('received: %s', message);
		wss.broadcast(message);
	});
});

wss.broadcast = function (data) {
	this.clients.forEach(function (client) {
		if (client.readyState === WebSocket.OPEN) {
			client.send(data);
		}
	});
};

한쪽 피어가 보낸 메시지를 다른 피어한테 웹소켓을 통해 전송해주는 로직으로, 사실상 채팅 서버와 거의 동일하다고 볼 수 있다. 그래서 Go로 작성된 웹소켓 기반 채팅서버 예제를 참조하여 금방 시그널링 서버를 구현할 수 있었다. https://github.com/jos-/gofiber-websocket-chat-example의 예제를 참조하여 동일 클라이언트 코드를 사용하는 Go 예제로 옮겨보았다.

프로젝트는 GoFiber웹소켓 extension을 사용하여 진행하였다. 웹소켓 관련 Go 라이브러리 중에서 가장 일반적으로 사용되는 게 Gorilla WebSocket으로 알고 있는데, 2022년 12월 쯤 Gorilla WebSocket의 깃허브 레포지토리가 archived 상태가 되었다. 괜찮은 대체제를 찾는 겸, Fiber 연습하는 겸 GoFiber WebSocket extension를 사용하게 되었다.


먼저, 다음과 같이 유저가 접속하거나 나갈 때 데이터를 전송할 채널과, 채널에 들어온 데이터를 받아서 처리할 고루틴을 작성해주었다.

type channelSet struct {
	register   chan *ws.Conn
	unregister chan *ws.Conn
	broadcast  chan string
}
type client struct{}

var clients = make(map[*ws.Conn]client)

func sendBroadcastMessage(cs *channelSet, message string) {
	for c := range clients {
		if err := c.WriteMessage(ws.TextMessage, []byte(message)); err != nil {
			fmt.Println("write error: err")
			c.WriteMessage(ws.CloseMessage, []byte{})
			c.Close()
		}
	}
}

func userHub(cs *channelSet) {
	for {
		select {
		case registerUser := <-cs.register:
			clients[registerUser] = client{}
			fmt.Println("new client is connected")

		case unregisterUser := <-cs.unregister:
			delete(clients, unregisterUser)
			fmt.Println("connection terminated")
		}
	}
}

func messageHub(cs *channelSet) {
	for {
		select {
		case message := <-cs.broadcast:
			fmt.Println("message received")
			sendBroadcastMessage(cs, message)
		}
	}
}

func main() {
	var channels = channelSet{
		register:   make(chan *ws.Conn),
		unregister: make(chan *ws.Conn),
		broadcast:  make(chan string),
	}

	go userHub(&channels)
	go messageHub(&channels)

	...
}

각각 register 채널에 유저 정보가 입력되면 유저를 전역변수 clients에 추가하고, unregister 채널에 입력되면 유저를 제거한다. broadcast 채널에 메시지가 입력되면 메시지를 모든 유저에게 전달한다.

*ws.Conn 타입의 인스턴스는 각 웹소켓 클라이언트에 해당하는 것으로, *ws.ConnReadMessage() 등을 호출하여 데이터를 읽거나 쓸 수 있다.


유저의 웹소켓 세션에 관련된 부분은 다음과 같다.

func main() {
	...

	app.Get("/ws/",
			func(c *fiber.Ctx) error {
				if ws.IsWebSocketUpgrade(c) {
					c.Next()
				}
				return nil
			},
			ws.New(func(c *ws.Conn) {
				defer func() {
					channels.unregister <- c
					c.Close()
				}()

				channels.register <- c

				for {
					messageType, message, err := c.ReadMessage()
					if err != nil {
						if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
							fmt.Println("read error:", err)
						}
						return
					}

					if messageType == ws.TextMessage {
						channels.broadcast <- string(message)
					} else {
						fmt.Println("websocket message recived of type", messageType)
					}
				}
			}))
}

먼저 웹소켓을 사용할 수 있는지 검사한 후, 사용 가능하다면 웹소켓 연결을 생성한다. 웹소켓 연결이 생성되면 register 채널에 클라이언트 정보를 보내고, defer문을 통해 클라이언트가 웹소켓 연결을 끊으면 unregister 채널에 클라이언트 정보를 보낼 수 있게끔 하였다.

1:1 연결에서는 이러한 100줄 정도의 코드만으로도 시그널링 서버를 구현할 수 있다.


전체 코드는 https://github.com/jhseoeo/webrtc-mesh-server/tree/5d435d745a48ba3be0c1299163029364ec31a3bf에서 찾아볼 수 있다.



개선할 점

일단 M:M으로 나아가기에 앞서, 위 예제에서 몇 가지 문제점을 찾아볼 수 있다.

  • 서버가 1:1 연결 하나만 처리할 수 있음… 세션 정도는 만들어줄 필요가 있음.
  • 클라이언트 코드도 마찬가지로 상대방이 한 명이라고 가정하여 작성되어 있음. 여러 명의 유저와 통신할 수 있게끔 수정해야 함.
  • 유저가 나갈 때에 대한 처리가 부족함. 한 유저가 나가면 RTCPeerConnection을 통해 유저가 나갔다는 사실을 전달받을 수 있겠지만, 그걸 통해 <video> element를 지운다던지 하는 처리가 전혀 없음.
  • 고루틴을 여러 개 띄워서 전역 변수인 client에 읽기/쓰기를 시도하는 반면, client는 뮤텍스 등을 통해 보호되지 않음. 이 때문에 새로고침을 막 눌러서 연결을 빡세게 시도하면 data race로 인한 panic이 일어나서 서버가 곧잘 죽는 것을 확인할 수 있음
  • 모든 컴포넌트가 main.go 한 파일에 몰려있음. 리팩토링을 통해 디커플링하는 겸 변수 이름도 Convention에 맞게 다시 지어주면 좋을 듯 함.

또한 Mesh 형태의 many-to-many 연결을 구현하기 위해서는 시그널링 서버에서 특정 유저한테만 메시지를 보낼 수 있는 방법이 추가되어야 한다. WebRTC 연결을 생성하려는 대상이 아닌 다른 대상에게까지 메시지가 전달되면 오류가 발생하여 연결 설정이 제대로 안될 수 있기 때문이다.

다음으로 다룰 예제에서 위 사항을 충족시킨 코드를 확인할 수 있을 것이다.



Many-to-Many Mesh Server

main.go의 변경사항이다.

func main() {
	hub := CreateHub()

	...

	app.Get("/ws/:session",
		func(c *fiber.Ctx) error { // check if a client can establish websocket connection
			if ws.IsWebSocketUpgrade(c) {
				c.Next()
			}
			return nil
		},
		ws.New(func(conn *ws.Conn) {
			WebsocketConnectionLoop(clientDataStore, hub, conn)
		}))
}

우선, 이전에 전역변수로 선언되던 clients는 각 세션별로 존재하며, 각 세션은 Hub에서 만들어진다.
또한 클라이언트별 웹소켓 루프를 별도의 함수로 지정하였다.


웹소켓 루프 함수를 살펴보자.

// Websocket Session Loop for each client
func WebsocketConnectionLoop(hub *MessageHub, conn *ws.Conn) {
	session := SessionName(conn.Params("session"))

	uuid, err := getUUID(conn)
	if err != nil {
		fmt.Println("an error occurred getting uuid:", err)
		conn.Close()
	}

	fmt.Printf("user %s joined on %s\n", uuid, session)

	client := Client{Conn: conn}
	hub.RegisterUser(session, uuid, client) // add current user's information to users list

	defer func() { // when user leaves
		fmt.Printf("user %s leaved from %s\n", uuid, session)
		hub.UnregisterUser(session, uuid, client) // delete current user's information from users list
		conn.Close()
	}()

	for {
		var messageData MessageData
		err := conn.ReadJSON(&messageData)

		if err != nil {
			if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
				fmt.Println("read error:", err)
			}
			return
		}

		hub.SendSignallingMessage(session, uuid, messageData)
	}
}

클라이언트가 접속하면 자신의 UUID를 보낸다. 클라이언트가 접속한 세션 정보와 이 UUID 정보를 바탕으로 RegisterUser() 메소드를 호출하면 클라이언트가 세션에 등록된다. 클라이언트가 접속을 종료할 때는 UnregisterUser()를 호출하여 유저의 퇴장을 처리한다.

이후, ICE나 SDP 등 시그널링 메시지를 처리하는 단계로 진입한다. SendSignallingMessage() 메소드를 통해 UUID로 구분하여 특정 대상에게 시그널링 메시지를 전달할 수 있다.


다음으로, SessionDataStore에 대해 살펴보자.

// Client struct type. you can add any data here
// Client struct type. you can add any data here
type Client struct {
	Conn *ws.Conn
}

// Datastore of client
type SessionDataStore struct {
	mutex     sync.RWMutex
	dataStore map[UUIDType]Client
}

// Create new datastore
func MakeSessionDataStore() *SessionDataStore {
	return &SessionDataStore{
		dataStore: make(map[UUIDType]Client),
	}
}

SessionDataStore는 세션별로 유저들의 정보가 저장되는 자료구조로, GetSessionData(), SetUserData(), DeleteUserData() 메소드를 통해 접근할 수 있다. RWMutex를 설정하여 여러 고루틴에서 동시에 접근하여 생길 수 있는 Data race 문제를 해결하였다.


채널을 관리하는 Hub쪽을 살펴보자.

...

// Message data protocol
type MessageData struct {
	Type    string      `json:"type"`
	Data    interface{} `json:"data"`
	SrcUUID UUIDType    `json:"srcuuid"`
	DstUUID UUIDType    `json:"dstuuid"`
}

type ChannelSet struct {
	register      chan UserInfo
	unregister    chan UserInfo
	deleteSession chan bool
	broadcast     chan MessageInfo
	signaling     chan MessageInfo
}

// Set of channels
type Hub struct {
	mutex    sync.RWMutex
	channels map[SessionName]ChannelSet
}

func CreateHub() *Hub {
	hub := Hub{
		channels: make(map[SessionName]ChannelSet),
	}

	return &hub
}

func (h *Hub) SendBroadcastMessage(session SessionName, uuid UUIDType, message MessageData) {
	h.mutex.RLock()
	defer h.mutex.RUnlock()

	h.channels[session].broadcast <- MessageInfo{session, message}
}

func (h *Hub) SendSignallingMessage(session SessionName, uuid UUIDType, message MessageData) {
	h.mutex.RLock()
	defer h.mutex.RUnlock()

	h.channels[session].signaling <- MessageInfo{session, message}
}

Hub는 각 세션 별로 채널을 만들고 관리하는 역할이다. Hub의 메소드는 기본적으로 호출이 되면 유저의 세션에 따라 대응되는 채널에 정보를 집어넣는다. 이 채널은 각 세션별 고루틴으로 연결되어, 해당 고루틴에서 처리된다.


func (h *Hub) RegisterUser(session SessionName, uuid UUIDType, client Client) error {
	if _, ok := h.channels[session]; !ok {
		channelSet, err := RunSessionLoop()
		if err != nil {
			return err
		}

		h.mutex.Lock()
		h.channels[session] = *channelSet
		h.mutex.Unlock()
	}

	h.mutex.RLock()
	h.channels[session].register <- UserInfo{session, uuid, client}
	h.mutex.RUnlock()

	return nil
}

func (h *Hub) UnregisterUser(session SessionName, uuid UUIDType, client Client) {
	h.mutex.RLock()
	h.channels[session].unregister <- UserInfo{session, uuid, client}
	toDetete := <-h.channels[session].deleteSession
	h.mutex.RUnlock()

	if toDetete {
		h.mutex.Lock()
		delete(h.channels, session)
		h.mutex.Unlock()
	}
}

RegisterUser()에서는 채널에 등록할 유저 정보를을 넣기에 앞서 채널이 있는지 검사하여, 없다면 각 세션의 고루틴 및 채널을 생성한다. UnregisterUser()에서는 세션에 더이상 유저가 없는 경우 채널을 삭제하는 로직까지 추가된다.


마지막으로 각 세션의 고루틴을 실행하는 RunSessionLoop() 함수를 살펴보자.

func RunSessionLoop() (*ChannelSet, error) {
	channelSet := ChannelSet{
		register:      make(chan UserInfo),
		unregister:    make(chan UserInfo),
		deleteSession: make(chan bool),
		broadcast:     make(chan MessageInfo),
		signaling:     make(chan MessageInfo),
	}

	clients := MakeSessionDataStore()

	go func() {
	loop:
		for {
			select {
			case registerUser := <-channelSet.register:
				err := handleUserRegister(clients, registerUser)
				if err != nil {
					fmt.Println("an error occurred while handling user registration, but still process :", err)
				}

			case unregisterUser := <-channelSet.unregister:
				err := handleUserUnregister(clients, unregisterUser)
				if err != nil {
					fmt.Println("an error occurred while handling user unregistration, but still process :", err)
				}

				if clients.IsEmpty(unregisterUser.Session) {
					channelSet.deleteSession <- true
					break loop
				} else {
					channelSet.deleteSession <- false
				}

			case messageData := <-channelSet.broadcast:
				err := sendBroadcastMessage(clients, messageData)
				if err != nil {
					fmt.Println("an error occurred while sending broadcast message, but still process :", err)
				}

			case messageData := <-channelSet.signaling:
				err := sendSignalingMessage(clients, messageData)
				if err != nil {
					fmt.Println("an error occurred while sending signaling message, but still process :", err)
				}
			}
		}
	}()

	return &channelSet, nil
}

채널 목록 및 MakeSessionDataStore()으로 세션별 데이터 저장소를 생성하고 for~switch loop으로 채널에서 데이터를 읽어와 처리한다.


전체 코드는 https://github.com/jhseoeo/webrtc-mesh-server에서 찾아볼 수 있다.



후기

many-to-many 만들 때 시그널링 메시지를 브로드캐스팅으로 보내버려서 피어 수가 3명이 넘어가면 연결이 제대로 안됐었는데, 뭐가 문제인지 찾는 데 한참 걸렸다.

개인적으로 WebRTC 좀 친다고 생각했었는데 아직 갈 길이 먼 것 같다..

© 2024 JHSeo. All right reserved.